/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.fnexecution.control;



import com.bff.gaia.unified.model.fnexecution.v1.UnifiedFnApi.Target;

import com.bff.gaia.unified.model.pipeline.v1.RunnerApi.Environment;

import com.bff.gaia.unified.runners.core.construction.graph.ExecutableStage;

import com.bff.gaia.unified.runners.fnexecution.GrpcFnServer;

import com.bff.gaia.unified.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;

import com.bff.gaia.unified.runners.fnexecution.data.GrpcDataService;

import com.bff.gaia.unified.runners.fnexecution.environment.EnvironmentFactory;

import com.bff.gaia.unified.runners.fnexecution.environment.RemoteEnvironment;

import com.bff.gaia.unified.runners.fnexecution.state.GrpcStateService;

import com.bff.gaia.unified.runners.fnexecution.state.StateRequestHandler;

import com.bff.gaia.unified.runners.fnexecution.environment.DockerEnvironmentFactory;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.fn.IdGenerator;

import com.bff.gaia.unified.sdk.fn.data.FnDataReceiver;

import com.bff.gaia.unified.sdk.util.WindowedValue;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Iterables;



import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentMap;



/**

 * A {@link JobBundleFactory} which can manage a single instance of an {@link Environment}.

 *

 * @deprecated replace with a {@link DefaultJobBundleFactory} when appropriate if the {@link

 *     EnvironmentFactory} is a {@link

 *     DockerEnvironmentFactory}, or create an

 *     {@code InProcessJobBundleFactory} and inline the creation of the environment if appropriate.

 */

@Deprecated

public class SingleEnvironmentInstanceJobBundleFactory implements JobBundleFactory {

  public static JobBundleFactory create(

      EnvironmentFactory environmentFactory,

      GrpcFnServer<GrpcDataService> data,

      GrpcFnServer<GrpcStateService> state,

      IdGenerator idGenerator) {

    return new SingleEnvironmentInstanceJobBundleFactory(

        environmentFactory, data, state, idGenerator);

  }



  private final EnvironmentFactory environmentFactory;



  private final GrpcFnServer<GrpcDataService> dataService;

  private final GrpcFnServer<GrpcStateService> stateService;



  private final ConcurrentMap<ExecutableStage, StageBundleFactory> stageBundleFactories =

      new ConcurrentHashMap<>();

  private final ConcurrentMap<Environment, RemoteEnvironment> environments =

      new ConcurrentHashMap<>();



  private final IdGenerator idGenerator;



  private SingleEnvironmentInstanceJobBundleFactory(

      EnvironmentFactory environmentFactory,

      GrpcFnServer<GrpcDataService> dataService,

      GrpcFnServer<GrpcStateService> stateService,

      IdGenerator idGenerator) {

    this.environmentFactory = environmentFactory;

    this.dataService = dataService;

    this.stateService = stateService;

    this.idGenerator = idGenerator;

  }



  @Override

  public StageBundleFactory forStage(ExecutableStage executableStage) {

    return stageBundleFactories.computeIfAbsent(executableStage, this::createBundleFactory);

  }



  private StageBundleFactory createBundleFactory(ExecutableStage stage) {

    RemoteEnvironment remoteEnv =

        environments.computeIfAbsent(

            stage.getEnvironment(),

            env -> {

              try {

                return environmentFactory.createEnvironment(env);

              } catch (Exception e) {

                throw new RuntimeException(e);

              }

            });

    SdkHarnessClient sdkHarnessClient =

        SdkHarnessClient.usingFnApiClient(

                remoteEnv.getInstructionRequestHandler(), dataService.getService())

            .withIdGenerator(idGenerator);

    ExecutableProcessBundleDescriptor descriptor;

    try {

      descriptor =

          ProcessBundleDescriptors.fromExecutableStage(

              idGenerator.getId(),

              stage,

              dataService.getApiServiceDescriptor(),

              stateService.getApiServiceDescriptor());

    } catch (IOException e) {

      throw new RuntimeException(e);

    }

    SdkHarnessClient.BundleProcessor bundleProcessor =

        sdkHarnessClient.getProcessor(

            descriptor.getProcessBundleDescriptor(),

            descriptor.getRemoteInputDestinations(),

            stateService.getService());

    return new BundleProcessorStageBundleFactory(descriptor, bundleProcessor);

  }



  @Override

  public void close() throws Exception {

    Exception thrown = null;

    for (RemoteEnvironment remoteEnvironment : environments.values()) {

      try {

        remoteEnvironment.close();

      } catch (Exception e) {

        if (thrown == null) {

          thrown = e;

        } else {

          thrown.addSuppressed(e);

        }

      }

    }

    if (thrown != null) {

      throw thrown;

    }

  }



  private static class BundleProcessorStageBundleFactory implements StageBundleFactory {

    private final ExecutableProcessBundleDescriptor descriptor;

    private final SdkHarnessClient.BundleProcessor processor;



    private BundleProcessorStageBundleFactory(

		ExecutableProcessBundleDescriptor descriptor, SdkHarnessClient.BundleProcessor processor) {

      this.descriptor = descriptor;

      this.processor = processor;

    }



    @Override

    public RemoteBundle getBundle(

        OutputReceiverFactory outputReceiverFactory,

        StateRequestHandler stateRequestHandler,

        BundleProgressHandler progressHandler) {

      Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();

      for (Map.Entry<Target, Coder<WindowedValue<?>>> targetCoders :

          descriptor.getOutputTargetCoders().entrySet()) {

        String bundleOutputPCollection =

            Iterables.getOnlyElement(

                descriptor

                    .getProcessBundleDescriptor()

                    .getTransformsOrThrow(targetCoders.getKey().getPrimitiveTransformReference())

                    .getInputsMap()

                    .values());

        FnDataReceiver<WindowedValue<?>> outputReceiver =

            outputReceiverFactory.create(bundleOutputPCollection);

        outputReceivers.put(

            targetCoders.getKey(),

            RemoteOutputReceiver.of(targetCoders.getValue(), outputReceiver));

      }

      return processor.newBundle(outputReceivers, stateRequestHandler, progressHandler);

    }



    @Override

    public ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {

      return descriptor;

    }



    @Override

    public void close() {}

  }

}